flink 自定义触发器 定时或达到数量触发

您所在的位置:网站首页 trigger 官网 flink 自定义触发器 定时或达到数量触发

flink 自定义触发器 定时或达到数量触发

2023-10-24 00:30| 来源: 网络整理| 查看: 265

flink 触发器

触发器确定窗口(由窗口分配程序形成)何时准备由窗口函数处理。每个WindowAssigner都带有一个默认触发器。 如果默认触发器不适合需求,我们就需要自定义触发器。

主要方法

触发器接口有五种方法,允许触发器对不同的事件作出反应

onElement()添加到每个窗口的元素都会调用此方法。 onEventTime()当注册的事件时间计时器触发时,将调用此方法。 onProcessingTime()当注册的处理时间计时器触发时,将调用此方法。 onMerge()与有状态触发器相关,并在两个触发器对应的窗口合并时合并它们的状态,例如在使用会话窗口时。(目前没使用过,了解不多) clear()执行删除相应窗口时所需的任何操作。(一般是删除定义的状态、定时器等) TriggerResult

onElement(),onEventTime(),onProcessingTime()都要求返回一个TriggerResult

TriggerResult包含以下内容

CONTINUE:表示啥都不做。 FIRE:表示触发计算,同时保留窗口中的数据 PURGE:简单地删除窗口的内容,并保留关于窗口和任何触发器状态的任何潜在元信息。 FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。(默认情况下,预先实现的触发器只触发而不清除窗口状态。) 案例 需求 当窗口中的数据量达到一定数量的时候触发计算 根据执行时间每隔一定时间且窗口中有数据触发计算,如果没有数据不触发计算 窗口关闭的时候清除数据 实现过程

案例逻辑图.png

依赖 3.1.1.3.1.0.0-78 1.9.1 2.11 2.11.7 org.scala-lang scala-library ${scala.version} org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-core ${flink.version} 实现代码 //调用 dStream .keyBy(_.event_id) .window(TumblingEventTimeWindows.of(Time.hours(1))) .trigger(new CustomTrigger(10, 1 * 60 * 1000L)) //------------------------------------------------------------------------- package com.meda.demo import java.text.SimpleDateFormat import com.meda.utils.DatePattern import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.ReducingStateDescriptor import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.TimeWindow class CustomTrigger extends Trigger[eventInputDT, TimeWindow] { //触发计算的最大数量 private var maxCount: Long = _ //定时触发间隔时长 (ms) private var interval: Long = 60 * 1000 //记录当前数量的状态 private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long]) //记录执行时间定时触发时间的状态 private lazy val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long]) //记录时间时间定时器的状态 private lazy val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long]) def this(maxCount: Int) { this() this.maxCount = maxCount } def this(maxCount: Int, interval: Long) { this(maxCount) this.interval = interval } override def onElement(element: eventInputDT, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { val countState = ctx.getPartitionedState(countStateDescriptor) //计数状态加1 countState.add(1L) //如果没有设置事件时间定时器,需要设置一个窗口最大时间触发器,这个目的是为了在窗口清除的时候 利用时间时间触发计算,否则可能会缺少部分数据 if (ctx.getPartitionedState(eventTimerStateDescriptor).get() == 0L) { ctx.getPartitionedState(eventTimerStateDescriptor).add(window.maxTimestamp()) ctx.registerEventTimeTimer(window.maxTimestamp()) } if (countState.get() >= this.maxCount) { //达到指定指定数量 //删除事件时间定时触发的状态 ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get()) //清空计数状态 countState.clear() //触发计算 TriggerResult.FIRE } else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) { //未达到指定数量,且没有指定定时器,需要指定定时器 //当前定时器状态值加上间隔值 ctx.getPartitionedState(processTimerStateDescriptor).add(ctx.getCurrentProcessingTime + interval) //注册定执行时间定时器 ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get()) TriggerResult.CONTINUE } else { TriggerResult.CONTINUE } } // 执行时间定时器触发 override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() == time)) { println(s"数据量未达到 $maxCount ,由执行时间触发器 ctx.getPartitionedState(processTimerStateDescriptor).get()) 触发计算") ctx.getPartitionedState(processTimerStateDescriptor).clear() ctx.getPartitionedState(countStateDescriptor).clear() TriggerResult.FIRE } else { TriggerResult.CONTINUE } } //事件时间定时器触发 override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() > 0L)) { //还有未触发计算的数据 println(s"事件时间到达最大的窗口时间,并且窗口中还有未计算的数据:${ctx.getPartitionedState(countStateDescriptor).get()},触发计算并清除窗口") ctx.getPartitionedState(eventTimerStateDescriptor).clear() TriggerResult.FIRE_AND_PURGE } else if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() == 0L)) { //没有未触发计算的数据 println("事件时间到达最大的窗口时间,但是窗口中没有有未计算的数据,清除窗口 但是不触发计算") TriggerResult.PURGE } else { TriggerResult.CONTINUE } } //窗口结束时清空状态 override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = { // println(s"清除窗口状态,定时器") ctx.deleteEventTimeTimer(ctx.getPartitionedState(eventTimerStateDescriptor).get()) ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get()) ctx.getPartitionedState(processTimerStateDescriptor).clear() ctx.getPartitionedState(eventTimerStateDescriptor).clear() ctx.getPartitionedState(countStateDescriptor).clear() } //更新状态为累加值 class Sum extends ReduceFunction[Long] { override def reduce(value1: Long, value2: Long): Long = value1 + value2 } //更新状态为取新的值 class Update extends ReduceFunction[Long] { override def reduce(value1: Long, value2: Long): Long = value2 } }

留下的疑问: 之前看资料的时候好像说定时器只能设置一个,你设置多个它也只会选择一个执行。 但是我这里事件、执行时间定时器都设置,好像都生效了。这点还没看懂。 后续研究下啥情况。

本文为个人原创文章,转载请注明出处。!!!!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3